package com.ucar.datalink.worker.core.runtime.coordinate;

import com.ucar.datalink.common.errors.DatalinkException;
import com.ucar.datalink.worker.core.runtime.TaskConfigManager;
import com.ucar.datalink.worker.core.runtime.WorkerConfig;
import org.apache.kafka.clients.*;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Time;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
 * This class manages the coordination process with datalink-managers for the Datalink cluster group membership. It ties together
 * the Coordinator, which implements the group member protocol, with all the other pieces needed to drive the connection
 * to the group coordinator manager. This isolates all the networking to a single thread managed by this class, with
 * higher level operations in response to group membership events being handled by the keeper.
 */
public class WorkerGroupMember {
    private static final Logger log = LoggerFactory.getLogger(WorkerGroupMember.class);

    private static final AtomicInteger DATALINK_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private static final String JMX_PREFIX = "datalink.worker";

    private final Time time;
    private final String clientId;
    private final ConsumerNetworkClient client;
    private final Metrics metrics;
    private final Metadata metadata;
    private final long retryBackoffMs;
    private final WorkerCoordinator coordinator;

    private boolean stopped = false;

    public WorkerGroupMember(WorkerConfig config,
                             String restUrl,
                             TaskConfigManager jobTaskConfigManager,
                             WorkerRebalanceListener listener,
                             Time time) {
        try {
            this.time = time;

            String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
            clientId = clientIdConfig.length() <= 0 ? "datalink-worker-" + DATALINK_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
            Map<String, String> metricsTags = new LinkedHashMap<>();
            metricsTags.put("client-id", clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
                    .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                    .tags(metricsTags);
            List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
            reporters.add(new JmxReporter(JMX_PREFIX));
            this.metrics = new Metrics(metricConfig, reporters, time);
            this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
            this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG));
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
            this.metadata.update(Cluster.bootstrap(addresses), 0);
            String metricGrpPrefix = "datalink.worker";
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
            NetworkClient netClient = new NetworkClient(
                    new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
                    this.metadata,
                    clientId,
                    100, // a fixed large enough value will suffice
                    config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
                    config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
                    config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
                    config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), time);
            this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
                    config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)){
                @Override
                public boolean awaitMetadataUpdate(long timeout) {
                    metadata.update(Cluster.bootstrap(addresses),time.milliseconds());
                    return super.awaitMetadataUpdate(timeout);
                }
            };
            this.coordinator = new WorkerCoordinator(this.client,
                    config.getString(WorkerConfig.GROUP_ID_CONFIG),
                    config.getInt(WorkerConfig.REBALANCE_TIMEOUT_MS_CONFIG),
                    config.getInt(WorkerConfig.SESSION_TIMEOUT_MS_CONFIG),
                    config.getInt(WorkerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
                    metrics,
                    metricGrpPrefix,
                    this.time,
                    retryBackoffMs,
                    restUrl,
                    jobTaskConfigManager,
                    listener);

            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
            log.debug("datalink worker group member created");
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed
            // this is to prevent resource leak.
            stop(true);
            // now propagate the errors
            throw new DatalinkException("Failed to construct datalink worker", t);
        }
    }

    public void stop() {
        if (stopped) {
            return;
        }
        stop(false);
    }

    public void ensureActive() {
        coordinator.poll(0);
    }

    public void poll(long timeout) {
        if (timeout < 0) {
            throw new IllegalArgumentException("Timeout must not be negative");
        }
        coordinator.poll(timeout);
    }

    /**
     * Interrupt any running poll() calls, causing a WakeupException to be thrown in the thread invoking that method.
     */
    public void wakeup() {
        this.client.wakeup();
    }

    /**
     * Get the member ID of this worker in the group of workers.
     * <p>
     * This ID is the unique member ID automatically generated.
     *
     * @return the member ID
     */
    public String memberId() {
        return coordinator.memberId();
    }

    public void requestRejoin() {
        coordinator.requestRejoin();
    }

    public void maybeLeaveGroup() {
        coordinator.maybeLeaveGroup();
    }

    public String ownerUrl(String taskId) {
        return coordinator.ownerUrl(taskId);
    }

    private void stop(boolean swallowException) {
        log.trace("Stopping the Connect group member.");
        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
        this.stopped = true;
        ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
        ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
        ClientUtils.closeQuietly(client, "consumer network client", firstException);
        AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
        if (firstException.get() != null && !swallowException) {
            throw new KafkaException("Failed to stop the Connect group member", firstException.get());
        } else {
            log.debug("The Connect group member has stopped.");
        }
    }
}
